# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs fio benchmarks.

Man: http://manpages.ubuntu.com/manpages/natty/man1/fio.1.html
Quick howto: http://www.bluestop.org/fio/HOWTO.txt
"""

import json
import logging
import posixpath
import re
import time
from typing import Any

from absl import flags
import jinja2
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker import units
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_benchmarks.fio import flags as fio_flags
from perfkitbenchmarker.linux_packages import fio


PKB_FIO_LOG_FILE_NAME = 'pkb_fio_avg'
LOCAL_JOB_FILE_SUFFIX = '_fio.job'  # used with vm_util.PrependTempDir()
REMOTE_JOB_FILE_PATH = posixpath.join(vm_util.VM_TMP_DIR, 'fio.job')
DEFAULT_TEMP_FILE_NAME = 'fio-temp-file'
MOUNT_POINT = '/scratch'


# This dictionary maps scenario names to dictionaries of fio settings.
SCENARIOS = {
    'sequential_write': {
        'name': 'sequential_write',
        'rwkind': 'write',
        'blocksize': '512k',
    },
    'sequential_read': {
        'name': 'sequential_read',
        'rwkind': 'read',
        'blocksize': '512k',
    },
    'random_write': {
        'name': 'random_write',
        'rwkind': 'randwrite',
        'blocksize': '4k',
    },
    'random_read': {
        'name': 'random_read',
        'rwkind': 'randread',
        'blocksize': '4k',
    },
    'random_read_write': {
        'name': 'random_read_write',
        'rwkind': 'randrw',
        'blocksize': '4k',
    },
    'sequential_trim': {
        'name': 'sequential_trim',
        'rwkind': 'trim',
        'blocksize': '512k',
    },
    'rand_trim': {'name': 'rand_trim', 'rwkind': 'randtrim', 'blocksize': '4k'},
}


FLAGS = flags.FLAGS

# Modes for --fio_target_mode
AGAINST_FILE_WITH_FILL_MODE = 'against_file_with_fill'
AGAINST_FILE_WITHOUT_FILL_MODE = 'against_file_without_fill'
AGAINST_DEVICE_WITH_FILL_MODE = 'against_device_with_fill'
AGAINST_DEVICE_WITHOUT_FILL_MODE = 'against_device_without_fill'
AGAINST_DEVICE_MODES = frozenset({
    AGAINST_DEVICE_WITH_FILL_MODE,
    AGAINST_DEVICE_WITHOUT_FILL_MODE,
})
FILL_TARGET_MODES = frozenset(
    {AGAINST_DEVICE_WITH_FILL_MODE, AGAINST_FILE_WITH_FILL_MODE}
)


def AgainstDevice():
  """Check whether we're running against a device or a file.

  Returns:
    True if running against a device, False if running against a file.
  """
  return fio_flags.FIO_TARGET_MODE.value in AGAINST_DEVICE_MODES


def FillTarget():
  """Check whether we should pre-fill our target or not.

  Returns:
    True if we should pre-fill our target, False if not.
  """
  return fio_flags.FIO_TARGET_MODE.value in FILL_TARGET_MODES


def FillDevice(vm, disk, fill_size, exec_path):
  """Fill the given disk on the given vm up to fill_size.

  Args:
    vm: a linux_virtual_machine.BaseLinuxMixin object.
    disk: a disk.BaseDisk attached to the given vm.
    fill_size: amount of device to fill, in fio format.
    exec_path: string path to the fio executable
  """

  command = (
      f'{exec_path} --filename={disk.GetDevicePath()} '
      f'--ioengine={fio_flags.FIO_IOENGINE.value} --name=fill-device '
      f'--blocksize={fio_flags.FIO_FILL_BLOCK_SIZE.value} --iodepth=64 '
      f'--rw=write --direct=1 --size={fill_size}'
  )

  vm.RobustRemoteCommand(command)


BENCHMARK_NAME = 'fio'
BENCHMARK_CONFIG = """
fio:
  description: Runs fio in sequential, random, read and write modes.
  vm_groups:
    default:
      vm_spec: *default_dual_core
      disk_spec: *default_500_gb
      vm_count: null
  flags:
    sar: True
"""


JOB_FILE_TEMPLATE = """
[global]
ioengine={{ioengine}}
invalidate=1
direct={{direct}}
runtime={{runtime}}
ramp_time={{ramptime}}
time_based
filename={{filename}}
do_verify=0
verify_fatal=0
group_reporting=1
percentile_list=1:5:10:20:25:30:40:50:60:70:75:80:90:95:99:99.5:99.9:99.95:99.99
{%- for parameter in parameters %}
{{parameter}}
{%- endfor %}
{%- for scenario in scenarios %}

{%- for pair in disks_list %}

[{{scenario['name']}}-io-depth-{{scenario['iodepth']}}-num-jobs-{{scenario['numjobs']}}{%- if scenario['target_raw_device'] == True %}.{{pair['index']}}{%- endif%}]
{%- if pair['index'] == 0 %}
stonewall
{%- endif%}
rw={{scenario['rwkind']}}
{%- if scenario['rwmixread'] is defined %}
rwmixread={{scenario['rwmixread']}}
{%- endif%}
{%- if scenario['rwmixwrite'] is defined %}
rwmixwrite={{scenario['rwmixwrite']}}
{%- endif%}
{%- if scenario['fsync'] is defined %}
fsync={{scenario['fsync']}}
{%- endif%}
{%- if scenario['blocksize'] is defined %}
blocksize={{scenario['blocksize']}}
{%- elif scenario['bssplit'] is defined %}
bssplit={{scenario['bssplit']}}
{%- endif%}
iodepth={{scenario['iodepth']}}
size={{scenario['size']}}
numjobs={{scenario['numjobs']}}
{%- if scenario['rate'] is defined %}
rate={{scenario['rate']}}
{%- endif%}
{%- if pair['disk_filename'] is defined %}
filename={{pair['disk_filename']}}
{%- endif%}
{%- endfor %}
{%- endfor %}
"""

SECONDS_PER_MINUTE = 60

# known rwkind fio parameters
RWKIND_SEQUENTIAL_READ = 'read'
RWKIND_SEQUENTIAL_WRITE = 'write'
RWKIND_RANDOM_READ = 'randread'
RWKIND_RANDOM_WRITE = 'randwrite'
RWKIND_SEQUENTIAL_READ_WRITE = 'rw'  # 'readwrite' is also valid
RWKIND_RANDOM_READ_WRITE = 'randrw'
RWKIND_SEQUENTIAL_TRIM = 'trim'

# define fragments from scenario_strings
OPERATION_READ = 'read'
OPERATION_WRITE = 'write'
OPERATION_TRIM = 'trim'
OPERATION_READWRITE = 'readwrite'  # mixed read and writes
ALL_OPERATIONS = frozenset(
    [OPERATION_READ, OPERATION_WRITE, OPERATION_TRIM, OPERATION_READWRITE]
)

ACCESS_PATTERN_SEQUENTIAL = 'seq'
ACCESS_PATTERN_RANDOM = 'rand'
ALL_ACCESS_PATTERNS = frozenset(
    [ACCESS_PATTERN_SEQUENTIAL, ACCESS_PATTERN_RANDOM]
)

# map from scenario_string fragments to rwkind fio parameter
MAP_ACCESS_OP_TO_RWKIND = {
    (ACCESS_PATTERN_SEQUENTIAL, OPERATION_READ): RWKIND_SEQUENTIAL_READ,
    (ACCESS_PATTERN_SEQUENTIAL, OPERATION_WRITE): RWKIND_SEQUENTIAL_WRITE,
    (ACCESS_PATTERN_RANDOM, OPERATION_READ): RWKIND_RANDOM_READ,
    (ACCESS_PATTERN_RANDOM, OPERATION_WRITE): RWKIND_RANDOM_WRITE,
    (
        ACCESS_PATTERN_SEQUENTIAL,
        OPERATION_READWRITE,
    ): RWKIND_SEQUENTIAL_READ_WRITE,
    (ACCESS_PATTERN_RANDOM, OPERATION_READWRITE): RWKIND_RANDOM_READ_WRITE,
    (ACCESS_PATTERN_SEQUENTIAL, RWKIND_SEQUENTIAL_TRIM): RWKIND_SEQUENTIAL_TRIM,
}

# check for known fields, as the JOB_FILE_TEMPLATE looks for these
# fields explicitly and needs to be updated
FIO_KNOWN_FIELDS_IN_JINJA = [
    'rwmixread',
    'rwmixwrite',
    'fsync',
    'iodepth',  # overrides --fio_io_depths
    'numjobs',  # overrides --fio_num_jobs
]


def _IsBlockSizeASplitSpecification(blocksize_str: str) -> bool:
  """determines if a blocksize_str looks like a bssplit parameter.

  an example parameter would be:

  format is blocksize/percent:blocksize/percent:...blocksize/percent

  e.g.
  8k/28:12k/23:4k/23:16k/7:20k/2:32k/17

  This is just a heuristic.

  Args:
    blocksize_str:  either a blocksize (like 4k) or a bssplit parameter

  Returns:
    True if this is split specification, false if a single block size.
  """
  return ':' in blocksize_str


def GetScenarioFromScenarioString(scenario_string):
  """Extract rwkind,blocksize,size from scenario string."""
  # look for legacy entries in the scenario map first
  result = (
      SCENARIOS.get(scenario_string, None)
      if fio_flags.FIO_USE_DEFAULT_SCENARIOS.value
      else None
  )
  if result:
    # return a copy so that the scenario can be mutated further if needed
    # without modifying the SCENARIO map
    return result.copy()

  # decode the parameters from the scenario name
  # in the format accesspattern_blocksize_operation_workingset
  # example pattern would be:
  #    rand_16k_write_100%
  #    seq_1M_write_100%
  fields = scenario_string.split('_')
  if len(fields) < 4:
    raise errors.Setup.InvalidFlagConfigurationError(
        f'Unexpected Scenario string format: {scenario_string}'
    )
  (access_pattern, blocksize_str, operation, workingset_str) = fields[0:4]

  if access_pattern not in ALL_ACCESS_PATTERNS:
    raise errors.Setup.InvalidFlagConfigurationError(
        f'Unexpected access pattern {access_pattern} '
        f'in scenario {scenario_string}'
    )

  if operation not in ALL_OPERATIONS:
    raise errors.Setup.InvalidFlagConfigurationError(
        f'Unexpected operation {operation}in scenario {scenario_string}'
    )

  access_op = (access_pattern, operation)
  rwkind = MAP_ACCESS_OP_TO_RWKIND.get(access_op, None)
  if not rwkind:
    raise errors.Setup.InvalidFlagConfigurationError(
        f'{access_pattern} and {operation} could not be mapped '
        'to a rwkind fio parameter from '
        f'scenario {scenario_string}'
    )

  # required fields of JOB_FILE_TEMPLATE
  result = {
      'name': scenario_string.replace(',', '__'),
      'rwkind': rwkind,
      'size': workingset_str,
  }

  if _IsBlockSizeASplitSpecification(blocksize_str):
    result['bssplit'] = blocksize_str
  else:
    result['blocksize'] = blocksize_str

  # The first four fields are well defined - after that, we use
  # key value pairs to encode any extra fields we need
  # The format is key-value for any additional fields appended
  # e.g. rand_16k_readwrite_5TB_rwmixread-65
  #          random access pattern
  #          16k block size
  #          readwrite operation
  #          5 TB working set
  #          rwmixread of 65. (so 65% reads and 35% writes)
  for extra_fields in fields[4:]:
    key_value = extra_fields.split('-')
    key = key_value[0]
    value = key_value[1]
    if key not in FIO_KNOWN_FIELDS_IN_JINJA:
      raise errors.Setup.InvalidFlagConfigurationError(
          'Unrecognized FIO parameter {} out of scenario {}'.format(
              key, scenario_string
          )
      )
    result[key] = value

  return result


def GenerateJobFileString(
    filename: str,
    scenario_strings: list[str],
    io_depths: list[int] | None,
    num_jobs: list[int] | None,
    working_set_size: int | None,
    block_size: Any,
    runtime: int,
    ramptime: int,
    direct: bool,
    parameters: list[str],
    raw_files: list[str],
    require_merge: bool,
) -> str:
  """Make a string with our fio job file.

  Args:
    filename: the file or disk we pre-filled, if any.
    scenario_strings: list of strings with names in SCENARIOS.
    io_depths: iterable of integers. The IO queue depths to test.
    num_jobs: iterable of integers. The number of fio processes to test.
    working_set_size: int or None. If int, the size of the working set in GB.
    block_size: Quantity or None. If quantity, the block size to use.
    runtime: int. The number of seconds to run each job.
    ramptime: int. The number of seconds to run the specified workload before
      logging any performance numbers.
    direct: boolean. Whether to use direct IO.
    parameters: list. Other fio parameters to be applied to all jobs.
    raw_files: A list of raw device paths.
    require_merge: Whether the jobs will be merged to be reported later.

  Returns:
    The contents of a fio job file, as a string.
  """

  if 'all' in scenario_strings and fio_flags.FIO_USE_DEFAULT_SCENARIOS.value:
    scenarios = SCENARIOS.values()
  else:
    scenarios = [
        GetScenarioFromScenarioString(scenario_string.strip('"'))
        for scenario_string in scenario_strings
    ]

  default_size_string = (
      str(working_set_size) + 'G' if working_set_size else '100%'
  )
  blocksize_override = (
      str(int(block_size.m_as(units.byte))) + 'B' if block_size else None
  )
  should_use_scenario_iodepth_numjobs = True

  for scenario in scenarios:
    # per legacy behavior, the block size parameter
    # overrides what is defined in the scenario string
    if blocksize_override:
      scenario['blocksize'] = blocksize_override

    # per legacy behavior, the size in the scenario_string overrides
    # size defined on the command line
    if 'size' not in scenario:
      scenario['size'] = default_size_string

    if 'iodepth' not in scenario or 'numjobs' not in scenario:
      should_use_scenario_iodepth_numjobs = False

  jinja_scenarios = []
  if should_use_scenario_iodepth_numjobs:
    # All scenarios supply iodepth and numjobs.
    # Remove iodepth and numjobs from scenario name to prevent redundancy.
    for scenario in scenarios:
      scenario['name'] = scenario['name'].replace(  # pytype: disable=attribute-error
          f'_iodepth-{scenario["iodepth"]}', ''
      )
      scenario['name'] = scenario['name'].replace(  # pytype: disable=attribute-error
          f'_numjobs-{scenario["numjobs"]}', ''
      )

    jinja_scenarios = scenarios  # scenarios already includes iodepth, numjobs
  else:
    # preserve functionality of creating a cross product of all
    # (scenario X io_depths X num_jobs)
    # which allows a single run to produce all of these metrics
    for scenario in scenarios:
      for num_job in num_jobs:
        for io_depth in io_depths:
          scenario_copy = scenario.copy()
          scenario_copy['iodepth'] = io_depth
          scenario_copy['numjobs'] = num_job
          jinja_scenarios.append(scenario_copy)

  disks_list = [{'index': 0}]

  for scenario in jinja_scenarios:
    if fio_flags.FIO_RATE_BANDWIDTH_LIMIT.value:
      scenario['rate'] = fio_flags.FIO_RATE_BANDWIDTH_LIMIT.value
    scenario['target_raw_device'] = require_merge

    if require_merge and raw_files:
      disks_list = [
          {
              'index': index,
              'disk_filename': raw_file,
          }
          for index, raw_file in enumerate(raw_files)
      ]

  job_file_template = jinja2.Template(
      JOB_FILE_TEMPLATE, undefined=jinja2.StrictUndefined
  )

  return str(
      job_file_template.render(
          ioengine=fio_flags.FIO_IOENGINE.value,
          runtime=runtime,
          ramptime=ramptime,
          filename=filename,
          scenarios=jinja_scenarios,
          direct=int(direct),
          parameters=parameters,
          disks_list=disks_list,
      )
  )


FILENAME_PARAM_REGEXP = re.compile(r'filename\s*=.*$', re.MULTILINE)


def ProcessedJobFileString(fio_jobfile_contents, remove_filename):
  """Modify the fio job if requested.

  Args:
    fio_jobfile_contents: the contents of a fio job file.
    remove_filename: bool. If true, remove the filename parameter from the job
      file.

  Returns:
    The job file as a string, possibly without filename parameters.
  """

  if remove_filename:
    return FILENAME_PARAM_REGEXP.sub('', fio_jobfile_contents)
  else:
    return fio_jobfile_contents


def GetOrGenerateJobFileString(
    job_file_path,
    scenario_strings,
    against_device,
    disks,
    io_depths,
    num_jobs,
    working_set_size,
    block_size,
    runtime,
    ramptime,
    direct,
    parameters,
    job_file_contents,
    require_merge,
):
  """Get the contents of the fio job file we're working with.

  This will either read the user's job file, if given, or generate a
  new one.

  Args:
    job_file_path: string or None. The path to the user's job file, if provided.
    scenario_strings: list of strings or None. The workload scenarios to
      generate.
    against_device: bool. True if testing against a raw device, False if testing
      against a filesystem.
    disks: the disk.BaseDisk object(s) to test against.
    io_depths: iterable of integers. The IO queue depths to test.
    num_jobs: iterable of integers. The number of fio processes to test.
    working_set_size: int or None. If int, the size of the working set in GB.
    block_size: Quantity or None. If Quantity, the block size to use.
    runtime: int. The number of seconds to run each job.
    ramptime: int. The number of seconds to run the specified workload before
      logging any performance numbers.
    direct: boolean. Whether to use direct IO.
    parameters: list. Other fio parameters to apply to all jobs.
    job_file_contents: string contents of fio job.
    require_merge: whether jobs will need to be merged for reporting.

  Returns:
    A string containing a fio job file.
  """

  user_job_file_string = GetFileAsString(job_file_path)

  use_user_jobfile = job_file_path or not scenario_strings

  if use_user_jobfile:
    remove_filename = against_device
    return ProcessedJobFileString(
        user_job_file_string or job_file_contents, remove_filename
    )
  else:
    raw_files = []
    if against_device:
      filename = ':'.join([disk.GetDevicePath() for disk in disks])
      for disk in disks:
        if disk.is_striped:
          raw_files += [d.GetDevicePath() for d in disk.disks]
        else:
          raw_files += [disk.GetDevicePath()]
    else:
      # Since we pass --directory to fio, we must use relative file
      # paths or get an error.
      filename = DEFAULT_TEMP_FILE_NAME

    return GenerateJobFileString(
        filename,
        scenario_strings,
        io_depths,
        num_jobs,
        working_set_size,
        block_size,
        runtime,
        ramptime,
        direct,
        parameters,
        raw_files,
        require_merge,
    )


NEED_SIZE_MESSAGE = (
    'You must specify the working set size when using '
    'generated scenarios with a filesystem.'
)


def WarnOnBadFlags():
  """Warn the user if they pass bad flag combinations."""

  if fio_flags.FIO_JOBFILE.value:
    ignored_flags = {
        '--' + flag_name
        for flag_name in fio_flags.FLAGS_IGNORED_FOR_CUSTOM_JOBFILE
        if FLAGS[flag_name].present
    }

    if ignored_flags:
      logging.warning(
          'Fio job file specified. Ignoring options "%s"',
          ', '.join(ignored_flags),
      )

  if (
      fio_flags.FIO_JOBFILE.value is None
      and fio_flags.FIO_GENERATE_SCENARIOS.value
      and not fio_flags.FIO_WORKING_SET_SIZE.value
      and not AgainstDevice()
  ):
    logging.error(NEED_SIZE_MESSAGE)
    raise errors.Benchmarks.PrepareException(NEED_SIZE_MESSAGE)


def GetConfig(user_config):
  config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
  if fio_flags.FIO_TARGET_MODE.value != AGAINST_FILE_WITHOUT_FILL_MODE:
    disk_spec = config['vm_groups']['default']['disk_spec']
    for cloud in disk_spec:
      disk_spec[cloud]['mount_point'] = None
  return config


def GetLogFlags(log_file_base):
  """Gets fio log files."""
  collect_logs = (
      fio_flags.FIO_LAT_LOG.value
      or fio_flags.FIO_BW_LOG.value
      or fio_flags.FIO_IOPS_LOG.value
  )
  fio_log_flags = [
      (
          fio_flags.FIO_LAT_LOG.value,
          '--write_lat_log=%(filename)s',
      ),
      (
          fio_flags.FIO_BW_LOG.value,
          '--write_bw_log=%(filename)s',
      ),
      (
          fio_flags.FIO_IOPS_LOG.value,
          '--write_iops_log=%(filename)s',
      ),
      (
          fio_flags.FIO_HIST_LOG.value,
          '--write_hist_log=%(filename)s',
      ),
      (
          collect_logs,
          '--log_avg_msec=%(interval)d',
      ),
  ]
  fio_command_flags = ' '.join([flag for given, flag in fio_log_flags if given])
  if fio_flags.FIO_HIST_LOG.value:
    fio_command_flags = ' '.join(
        [fio_command_flags, '--log_hist_msec=%(hist_interval)d']
    )

  return fio_command_flags % {
      'filename': log_file_base,
      'interval': fio_flags.FIO_LOG_AVG_MSEC.value,
      'hist_interval': fio_flags.FIO_LOG_HIST_MSEC.value,
  }


def CheckPrerequisites(benchmark_config):
  """Perform flag checks."""
  del benchmark_config  # unused
  WarnOnBadFlags()


def Prepare(benchmark_spec):
  """Prepare VM's in benchmark_spec to run FIO.

  Args:
    benchmark_spec: The benchmarks specification.
  """
  exec_path = fio.GetFioExec()
  vms = benchmark_spec.vms
  background_tasks.RunThreaded(lambda vm: PrepareWithExec(vm, exec_path), vms)


def GetFileAsString(file_path):
  if not file_path:
    return None
  with open(data.ResourcePath(file_path)) as jobfile:
    return jobfile.read()


def PrepareWithExec(vm, exec_path):
  """Prepare the virtual machine to run FIO.

     This includes installing fio, bc, and libaio1 and pre-filling the
     attached disk. We also make sure the job file is always located
     at the same path on the local machine.

  Args:
    vm: The virtual machine to prepare the benchmark on.
    exec_path: string path to the fio executable
  """
  logging.info('FIO prepare on %s', vm)
  vm.Install('fio')

  if FillTarget():
    logging.info(
        'Fill devices %s on %s',
        [disk.GetDevicePath() for disk in vm.scratch_disks],
        vm,
    )
    background_tasks.RunThreaded(
        lambda disk: FillDevice(
            vm, disk, fio_flags.FIO_FILL_SIZE.value, exec_path
        ),
        vm.scratch_disks,
    )

  # We only need to format and mount if the target mode is against
  # file with fill because 1) if we're running against the device, we
  # don't want it mounted and 2) if we're running against a file
  # without fill, it was never unmounted (see GetConfig()).
  if len(vm.scratch_disks) > 1:
    if not AgainstDevice():
      raise errors.Setup.InvalidSetupError(
          f'Target mode {fio_flags.FIO_TARGET_MODE.value} tests against 1 file,'
          ' but multiple scratch disks are configured.'
      )
    return

  disk = vm.scratch_disks[0]
  if fio_flags.FIO_TARGET_MODE.value == AGAINST_FILE_WITH_FILL_MODE:
    disk.mount_point = FLAGS.scratch_dir or MOUNT_POINT
    disk_spec = vm.create_disk_strategy.disk_specs[0]
    vm.FormatDisk(disk.GetDevicePath(), disk_spec.disk_type)
    vm.MountDisk(
        disk.GetDevicePath(),
        disk.mount_point,
        disk_spec.disk_type,
        disk.mount_options,
        disk.fstab_options,
    )


def Run(benchmark_spec):
  """Spawn fio on vm(s) and gather results."""
  vms = benchmark_spec.vms
  return RunFioOnVMs(vms)


def RunFioOnVMs(vms):
  """Spawn fio on vm(s) and gather results.

  Args:
    vms: A list of VMs to run FIO on.

  Returns:
    A list of sample.Sample objects.
  """
  fio_exe = fio.GetFioExec()
  default_job_file_contents = GetFileAsString(data.ResourcePath('fio.job'))
  samples = []

  path = REMOTE_JOB_FILE_PATH
  samples_list = background_tasks.RunThreaded(
      lambda vm: RunWithExec(vm, fio_exe, path, default_job_file_contents), vms
  )
  for i, _ in enumerate(samples_list):
    for item in samples_list[i]:
      item.metadata['machine_instance'] = i
    samples.extend(samples_list[i])

  return samples


def RunWithExec(
    vm,
    exec_path,
    remote_job_file_path,
    job_file_contents,
    fio_generate_scenarios=None,
):
  """Spawn fio and gather the results. Used by Windows FIO as well.

  Args:
    vm: vm to run the benchmark on.
    exec_path: string path to the fio executable.
    remote_job_file_path: path, on the vm, to the location of the job file.
    job_file_contents: string contents of the fio job file.
    fio_generate_scenarios: list of strings with scenarios to benchmark.

  Returns:
    A list of sample.Sample objects.
  """
  logging.info('FIO running on %s', vm)
  if not fio_generate_scenarios:
    fio_generate_scenarios = fio_flags.FIO_GENERATE_SCENARIOS.value
  disks = vm.scratch_disks
  require_merge = len(disks) > 1
  job_file_string = GetOrGenerateJobFileString(
      fio_flags.FIO_JOBFILE.value,
      fio_generate_scenarios,
      AgainstDevice(),
      disks,
      FLAGS.fio_io_depths,
      FLAGS.fio_num_jobs,
      fio_flags.FIO_WORKING_SET_SIZE.value,
      FLAGS.fio_blocksize,
      fio_flags.FIO_RUNTIME.value,
      fio_flags.FIO_RAMPTIME.value,
      fio_flags.DIRECT_IO.value,
      fio_flags.FIO_PARAMETERS.value,
      job_file_contents,
      require_merge,
  )
  job_file_path = vm_util.PrependTempDir(vm.name + LOCAL_JOB_FILE_SUFFIX)
  with open(job_file_path, 'w') as job_file:
    job_file.write(job_file_string)
    logging.info('Wrote fio job file at %s', job_file_path)
    logging.info(job_file_string)

  vm.PushFile(job_file_path, remote_job_file_path)

  if AgainstDevice():
    if 'filename' in job_file_string:
      filename_parameter = ''
    else:
      filenames = ':'.join([disk.GetDevicePath() for disk in disks])
      filename_parameter = f'--filename={filenames}'
    fio_command = (
        f'{exec_path} --output-format=json '
        f'--random_generator={fio_flags.FIO_RNG.value} '
        f'{filename_parameter} {remote_job_file_path}'
    )
  else:
    assert(len(disks) == 1)
    fio_command = (
        f'{exec_path} --output-format=json '
        f'--random_generator={fio_flags.FIO_RNG.value} '
        f'--directory={disks[0].mount_point} {remote_job_file_path}'
    )

  collect_logs = any([
      fio_flags.FIO_LAT_LOG.value,
      fio_flags.FIO_BW_LOG.value,
      fio_flags.FIO_IOPS_LOG.value,
      fio_flags.FIO_HIST_LOG.value,
  ])

  log_file_base = ''
  if collect_logs:
    log_file_base = '%s_%s' % (PKB_FIO_LOG_FILE_NAME, str(time.time()))
    fio_command = ' '.join([fio_command, GetLogFlags(log_file_base)])

  # TODO(user): This only gives results at the end of a job run
  #      so the program pauses here with no feedback to the user.
  #      This is a pretty lousy experience.
  logging.info('FIO Results:')
  start_time = time.time()
  stdout, _ = vm.RobustRemoteCommand(
      fio_command, timeout=fio_flags.FIO_COMMAND_TIMEOUT_SEC.value
  )
  end_time = time.time()
  bin_vals = []
  if collect_logs:
    vm.PullFile(vm_util.GetTempDir(), '%s*.log' % log_file_base)
    if fio_flags.FIO_HIST_LOG.value:
      num_logs = int(
          vm.RemoteCommand('ls %s_clat_hist.*.log | wc -l' % log_file_base)[0]
      )
      bin_vals += [
          fio.ComputeHistogramBinVals(
              vm, '%s_clat_hist.%s.log' % (log_file_base, idx + 1)
          )
          for idx in range(num_logs)
      ]
  samples = fio.ParseResults(
      job_file_string,
      json.loads(stdout),
      log_file_base=log_file_base,
      bin_vals=bin_vals,
      skip_latency_individual_stats=(
          not fio_flags.FIO_INCLUDE_LATENCY_PERCENTILES.value
      ),
      require_merge=require_merge
  )

  samples.append(
      sample.Sample('start_time', start_time, 'sec', samples[0].metadata)
  )
  samples.append(
      sample.Sample('end_time', end_time, 'sec', samples[0].metadata)
  )

  for item in samples:
    item.metadata['fio_target_mode'] = fio_flags.FIO_TARGET_MODE.value
    item.metadata['fio_fill_size'] = fio_flags.FIO_FILL_SIZE.value
    item.metadata['fio_fill_block_size'] = fio_flags.FIO_FILL_BLOCK_SIZE.value
    item.metadata['fio_rng'] = fio_flags.FIO_RNG.value

  return samples


def Cleanup(benchmark_spec):
  """Uninstall packages required for fio and remove benchmark files.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.
  """
  vms = benchmark_spec.vms
  background_tasks.RunThreaded(CleanupVM, vms)


def CleanupVM(vm):
  logging.info('FIO Cleanup up on %s', vm)
  vm.RemoveFile(REMOTE_JOB_FILE_PATH)
  if not AgainstDevice() and not fio_flags.FIO_JOBFILE.value:
    # If the user supplies their own job file, then they have to clean
    # up after themselves, because we don't know their temp file name.
    vm.RemoveFile(posixpath.join(vm.GetScratchDir(), DEFAULT_TEMP_FILE_NAME))
